'use strict';

const Queue = require('../');

const expect = require('chai').expect;
const redis = require('ioredis');
const utils = require('./utils');
const delay = require('delay');
const sinon = require('sinon');

describe('.pause', () => {
  let client;
  beforeEach(() => {
    client = new redis();
    return client.flushdb();
  });

  afterEach(() => {
    sinon.restore();
    return client.quit();
  });

  it('should pause a queue until resumed', () => {
    let ispaused = false,
      counter = 2;

    return utils.newQueue().then(queue => {
      const resultPromise = new Promise(resolve => {
        queue.process((job, jobDone) => {
          expect(ispaused).to.be.eql(false);
          expect(job.data.foo).to.be.equal('paused');
          jobDone();
          counter--;
          if (counter === 0) {
            resolve(queue.close());
          }
        });
      });

      return Promise.all([
        queue
          .pause()
          .then(() => {
            ispaused = true;
            return queue.add({ foo: 'paused' });
          })
          .then(() => {
            return queue.add({ foo: 'paused' });
          })
          .then(() => {
            ispaused = false;
            return queue.resume();
          }),
        resultPromise
      ]);
    });
  });

  it('should be able to pause a running queue and emit relevant events', done => {
    let ispaused = false,
      isresumed = true,
      first = true;

    utils.newQueue().then(queue => {
      queue.process(job => {
        expect(ispaused).to.be.eql(false);
        expect(job.data.foo).to.be.equal('paused');

        if (first) {
          first = false;
          ispaused = true;
          return queue.pause();
        } else {
          expect(isresumed).to.be.eql(true);
          queue.close().then(done, done);
        }
      });

      queue.add({ foo: 'paused' });
      queue.add({ foo: 'paused' });

      queue.on('paused', () => {
        ispaused = false;
        queue.resume().catch((/*err*/) => {
          // Swallow error.
        });
      });

      queue.on('resumed', () => {
        isresumed = true;
      });
    });
  });

  it('should pause the queue locally', done => {
    let counter = 2;

    const queue = utils.buildQueue();

    queue
      .pause(true /* Local */)
      .then(() => {
        // Add the worker after the queue is in paused mode since the normal behavior is to pause
        // it after the current lock expires. This way, we can ensure there isn't a lock already
        // to test that pausing behavior works.
        queue
          .process((job, jobDone) => {
            expect(queue.paused).not.to.be.ok;
            jobDone();
            counter--;
            if (counter === 0) {
              queue.close().then(done);
            }
          })
          .catch(done);
      })
      .then(() => {
        return queue.add({ foo: 'paused' });
      })
      .then(() => {
        return queue.add({ foo: 'paused' });
      })
      .then(() => {
        expect(counter).to.be.eql(2);
        expect(queue.paused).to.be.ok; // Parameter should exist.
        return queue.resume(true /* Local */);
      })
      .catch(done);
  });

  it('should wait until active jobs are finished before resolving pause', done => {
    const queue = utils.buildQueue();
    const startProcessing = new Promise(resolve => {
      queue.process((/*job*/) => {
        resolve();
        return delay(200);
      });
    });

    queue.isReady().then(() => {
      const jobs = [];
      for (let i = 0; i < 10; i++) {
        jobs.push(queue.add(i));
      }
      //
      // Add start processing so that we can test that pause waits for this job to be completed.
      //
      jobs.push(startProcessing);
      Promise.all(jobs)
        .then(() => {
          return queue
            .pause(true)
            .then(() => {
              const active = queue
                .getJobCountByTypes(['active'])
                .then(count => {
                  expect(count).to.be.eql(0);
                  expect(queue.paused).to.be.ok;
                  return null;
                });

              // One job from the 10 posted above will be processed, so we expect 9 jobs pending
              const paused = queue
                .getJobCountByTypes(['delayed', 'wait'])
                .then(count => {
                  expect(count).to.be.eql(9);
                  return null;
                });
              return Promise.all([active, paused]);
            })
            .then(() => {
              return queue.add({});
            })
            .then(() => {
              const active = queue
                .getJobCountByTypes(['active'])
                .then(count => {
                  expect(count).to.be.eql(0);
                  return null;
                });

              const paused = queue
                .getJobCountByTypes(['paused', 'wait', 'delayed'])
                .then(count => {
                  expect(count).to.be.eql(10);
                  return null;
                });

              return Promise.all([active, paused]);
            })
            .then(() => {
              return queue.close().then(done, done);
            });
        })
        .catch(done);
    });
  });

  it('should pause the queue locally when more than one worker is active', () => {
    const queue1 = utils.buildQueue('pause-queue');
    const queue1IsProcessing = new Promise(resolve => {
      queue1.process((job, jobDone) => {
        resolve();
        setTimeout(jobDone, 200);
      });
    });

    const queue2 = utils.buildQueue('pause-queue');
    const queue2IsProcessing = new Promise(resolve => {
      queue2.process((job, jobDone) => {
        resolve();
        setTimeout(jobDone, 200);
      });
    });

    queue1.add(1);
    queue1.add(2);
    queue1.add(3);
    queue1.add(4);

    return Promise.all([queue1IsProcessing, queue2IsProcessing]).then(() => {
      return Promise.all([
        queue1.pause(true /* local */),
        queue2.pause(true /* local */)
      ]).then(() => {
        const active = queue1.getJobCountByTypes(['active']).then(count => {
          expect(count).to.be.eql(0);
        });

        const pending = queue1.getJobCountByTypes(['wait']).then(count => {
          expect(count).to.be.eql(2);
        });

        const completed = queue1
          .getJobCountByTypes(['completed'])
          .then(count => {
            expect(count).to.be.eql(2);
          });

        return Promise.all([active, pending, completed]).then(() => {
          return Promise.all([queue1.close(), queue2.close()]);
        });
      });
    });
  });

  it('should wait for blocking job retrieval to complete before pausing locally', () => {
    const queue = utils.buildQueue();

    const startsProcessing = new Promise(resolve => {
      queue.process((/*job*/) => {
        resolve();
        return delay(200);
      });
    });

    return queue
      .add(1)
      .then(() => {
        return startsProcessing;
      })
      .then(() => {
        return queue.pause(true);
      })
      .then(() => {
        return queue.add(2);
      })
      .then(() => {
        const active = queue.getJobCountByTypes(['active']).then(count => {
          expect(count).to.be.eql(0);
        });

        const pending = queue.getJobCountByTypes(['wait']).then(count => {
          expect(count).to.be.eql(1);
        });

        const completed = queue
          .getJobCountByTypes(['completed'])
          .then(count => {
            expect(count).to.be.eql(1);
          });

        return Promise.all([active, pending, completed]).then(() => {
          return queue.close();
        });
      });
  });

  it('should not initialize blocking client if not already initialized', async () => {
    const createClient = sinon.spy(() => client);
    const queue = utils.buildQueue('pause-queue', { createClient });

    await queue.pause(true);
    const bClientCalls = createClient
      .getCalls()
      .filter(c => c.args[0] === 'bclient');
    expect(bClientCalls).to.have.lengthOf(0);
  });

  it('pauses fast when queue is drained', function(done) {
    this.timeout(10000);
    const queue = new Queue('test');

    queue.process((/*job*/) => {
      Promise.resolve();
    });

    queue.add({});

    queue.on('drained', () => {
      delay(500).then(() => {
        const start = new Date().getTime();
        return queue.pause(true).finally(() => {
          const finish = new Date().getTime();
          expect(finish - start).to.be.lt(1000);
          queue.close().then(done, done);
        });
      });
    });
  });

  it('should not processed delayed jobs', function(done) {
    this.timeout(5000);
    const queue = new Queue('pause-test');

    queue.process(() => {
      done(new Error('should not process delayed jobs in paused queue.'));
    });

    queue.pause().then(() => {
      queue
        .add(
          {},
          {
            delay: 500
          }
        )
        .then(() => {
          return queue.getJobCounts();
        })
        .then(counts => {
          expect(counts).to.have.property('paused', 0);
          expect(counts).to.have.property('waiting', 0);
          expect(counts).to.have.property('delayed', 1);
          return delay(1000);
        })
        .then(() => {
          return queue.getJobCounts();
        })
        .then(counts => {
          expect(counts).to.have.property('paused', 1);
          expect(counts).to.have.property('waiting', 0);
          done();
        });
    });
  });
});
